package com.cat.paxos.learner;

import com.cat.paxos.proposal.Proposal;

import java.util.*;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;

/**
 * @auther Cat.wang
 * @date 2020/1/16 17:59
 */
public class MemoryLearner implements Learner {

    private LeanerStrategy strategy = new HalfUpLeanerStrategy();

    private List<Learner> others = new ArrayList<>();

    private Queue<LeanProposal> learnProposals = new ArrayDeque<>();

    private Map<String, LeanProposal> hashIndex = new HashMap<>();

    private Lock lock = new ReentrantLock();

    private boolean isLocked;

    @Override
    public PreviewProposalResponse preview(String proposalNumber) {

        if (isLocked) {
            PreviewProposalResponse previewProposal = checkoutPreviewProposal(proposalNumber);
            if (previewProposal != null) return previewProposal;
        }

        try {
            lock.lock();
            isLocked = true;

            PreviewProposalResponse previewProposal = checkoutPreviewProposal(proposalNumber);
            if (previewProposal != null) return previewProposal;

            LeanProposal peek = learnProposals.peek();

            if (peek == null || strategy.decisionProposalNumberOk(peek.proposalNumber(), proposalNumber)) {

                LeanProposal leanProposal = new LeanProposal() {

                    private Proposal proposal;

                    @Override
                    public String proposalNumber() {
                        return proposalNumber;
                    }

                    @Override
                    public Proposal proposal() {
                        return this.proposal;
                    }

                    @Override
                    public void proposal(Proposal proposal) {
                        this.proposal = proposal;
                    }
                };

                List<Learner> learners = new ArrayList<>();
                try {
                    learnProposals.add(leanProposal);
                    hashIndex.put(leanProposal.proposalNumber(), leanProposal);
                    learners.addAll(broadcastPreviews(proposalNumber));
                } catch (Exception e) {
                    learnProposals.remove(leanProposal);
                    hashIndex.remove(leanProposal.proposalNumber());
                }

                return new PreviewProposalResponse() {
                    @Override
                    public String proposalNumber() {
                        return proposalNumber;
                    }

                    @Override
                    public boolean learned() {
                        return false;
                    }

                    @Override
                    public boolean ok() {
                        return true;
                    }

                    @Override
                    public List<Learner> needLearns() {
                        return learners;
                    }
                };
            } else {

                return new PreviewProposalResponse() {
                    @Override
                    public String proposalNumber() {
                        return peek.proposalNumber();
                    }

                    @Override
                    public boolean learned() {
                        return peek.proposal() != null;
                    }

                    @Override
                    public boolean ok() {
                        return false;
                    }

                    @Override
                    public List<Learner> needLearns() {
                        return Collections.EMPTY_LIST;
                    }
                };
            }

        } finally {
            lock.unlock();
            isLocked = false;
        }
    }

    private PreviewProposalResponse checkoutPreviewProposal(String proposalNumber) {
        LeanProposal leanProposal = hashIndex.get(proposalNumber);
        if (leanProposal == null) {
            return null;
        }
        return new PreviewProposalResponse() {
            @Override
            public String proposalNumber() {
                return proposalNumber;
            }

            @Override
            public boolean learned() {
                return leanProposal.proposal() != null;
            }

            @Override
            public boolean ok() {
                return true;
            }

            @Override
            public List<Learner> needLearns() {
                return Collections.EMPTY_LIST;
            }
        };
    }

    @Override
    public <T> LearnProposalResponse learn(Proposal<T> proposal) {
        return internalLearn(proposal, this.others);
    }

    public <T> LearnProposalResponse internalLearn(Proposal<T> proposal, List<Learner> learners) {

        for (LeanProposal leanProposal : learnProposals) {
            if (leanProposal.proposalNumber().equals(proposal.number())) {

                if (leanProposal.proposal() == null) {
                    leanProposal.proposal(proposal);

                    try {
                        learners.stream().parallel().forEach(l -> l.learn(proposal));
                    } catch (Exception e) {}
                }

                return new LearnProposalResponse() {

                    private String proposalNumber = proposal.number();

                    @Override
                    public String proposalNumber() {
                        return proposalNumber;
                    }

                    @Override
                    public boolean ok() {
                        return true;
                    }
                };
            }
        }

        PreviewProposalResponse preview = preview(proposal.number());
        if (preview.ok()) {
            internalLearn(proposal, preview.needLearns());
            return new LearnProposalResponse() {

                private String proposalNumber = preview.proposalNumber();

                @Override
                public String proposalNumber() {
                    return proposalNumber;
                }

                @Override
                public boolean ok() {
                    return true;
                }
            };
        }

        LeanProposal peek = learnProposals.peek();

        return new LearnProposalResponse() {

            private String proposalNumber = peek.proposalNumber();

            @Override
            public String proposalNumber() {
                return proposalNumber;
            }

            @Override
            public boolean ok() {
                return false;
            }
        };
    }

    /**
     * 广播预览
     * @param proposalNumber
     * @return
     */
    private List<Learner> broadcastPreviews(String proposalNumber) {
        return others.stream().parallel().filter(l -> {
            PreviewProposalResponse preview = l.preview(proposalNumber);
            if (!preview.ok()) {
                throw new IllegalArgumentException();
            }
            return !preview.learned();
        }).collect(Collectors.toList());
    }

    @Override
    public List<Learner> others() {
        return this.others;
    }
}
